package elastic

import scala.Predef.Map.apply
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.List.apply
import play.api.Logger
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.query.QueryBuilder
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.index.query.QueryStringQueryBuilder
import org.elasticsearch.client.Requests


//http://www.elasticsearch.org/guide/reference/java-api/
//http://blog.trifork.nl/2012/09/13/elasticsearch-beyond-big-data-running-elasticsearch-embedded/
class RUCUpdater extends service.RUCUpdater {

  val client = ElasticRuc.client

//  def findRuc(ruc: String): Map[String, Any] = {
  def findRuc(ruc: String) : (String, Long, Long) = {
    val query = QueryBuilders.queryString(ruc + "*").field("ruc")
    val ret = find(query)
//    Logger.info("findRuc: " + ret)
    ret
  }

  def findContribuyente(name: String) : (String, Long, Long) = {
    val query = QueryBuilders.queryString(name + "*").field("name").defaultOperator(QueryStringQueryBuilder.Operator.AND)
    find(query)
  }

  def find(query: QueryBuilder) : (String, Long, Long) = {
    val requestBuilder =
      client.prepareSearch("ruc")
        .setTypes("ruc")
        .setQuery(query)
    val response = requestBuilder.execute().actionGet()
//    var docs = List[Map[String, String]]()
//    for (hit <- response.hits().getHits()) {
//      val source = hit.getSource()
//      val m = Map("ruc" -> source.get("ruc").toString(),
//        "dv" -> source.get("dv").toString(),
//        "name" -> source.get("name").toString())
//      docs = m :: docs
//    }
    def addAttr(source: java.util.Map[String, Object], name: String) = {
      "\"" + name + "\": \"" + source.get(name).toString() + "\""
    }
    var docs = "["
    var hitCount = response.hits().getHits().length
    for (hit <- response.hits().getHits()) {
      docs += "{" 
      val source = hit.getSource()
      docs += addAttr(source, "ruc") + ", "
      docs += addAttr(source, "dv") + ", "
      docs += addAttr(source, "name")
      docs += "}"
      if(hitCount > 1) {
        docs += ","
        hitCount -= 1
      }
    }
    docs += "]"
    val count = response.hits().getTotalHits()
    val responseTime = response.getTookInMillis()
//    Map("rucs" -> docs, "count" -> count.toString, "responseTime" -> responseTime.toString)
    val ret = "{ \"rucs\": " + docs + ", \"count\": \"" + count + "\", \"responseTime\": \"" + responseTime + "\"}"  
//    Logger.info("found: " + ret)
    (ret, count, responseTime)
  }

  def count: Long = client.prepareCount("ruc").execute().actionGet().count()

  def createIndex() = {
    val existsRequest = Requests.indicesExistsRequest("ruc")
    val existsResponse =
      client.admin().indices().exists(existsRequest).actionGet()
    Logger.info("existsResponse: " + existsResponse.exists())
    if (existsResponse.exists()) {
      val deleteRequest = Requests.deleteIndexRequest("ruc")
      val deleteResponse =
        client.admin().indices().delete(deleteRequest).actionGet()
      Logger.info("deleteResponse: " + deleteResponse)
    }

    import collection.JavaConverters._
    val mapping = Map("type" -> "string").asJava
    val createRequest = Requests.createIndexRequest("ruc")

    val createResponse =
      client.admin().indices().create(createRequest).actionGet()
    Logger.info("createResponse: " + createResponse)
  }

  override def deleteRucs() = {
    //    val deleteResponse = client.prepareDelete()
    //      .execute()
    //      .actionGet()
    //    Logger.info("deleteResponse: " + deleteResponse)
  }

  //  private def createTestDocuments() = {
  //    Map("RUC" -> "1", "Contribuyente" -> "Eins") ::
  //      Map("RUC" -> "2", "Contribuyente" -> "Zwei") ::
  //      List()
  //  }

  override def update() = {
    Logger.info("ElasticRuc: " + ElasticRuc.node.settings().get("node.name"))
    downloadRucs()
    unzip()
    deleteRucs()
    createIndex()
    processRucfiles()
    //    import com.codahale.jerkson.Json._
    //    val list = createTestDocuments()
    //    for (doc <- list) {
    //      val json = generate(doc)
    //      val indexRequest = Requests.indexRequest("ruc").`type`("ruc").source(json)
    //      val indexResponse = client.index(indexRequest).actionGet()
    //      Logger.info("indexResponse: " + indexResponse)
    //    }
  }

  override def processRucfiles() = {
    import scala.io.Source

    for (i <- 0 to 9) {
      val fileName = rucUpdatePath + "ruc" + i + ".txt"
      Logger.info("file: " + fileName)
      val lines = Source.fromFile(fileName).getLines
      val bulkRequest = client.prepareBulk() //EL
      lines.foreach { line =>
        var strArr = line.split("\\|");

        val jsonBuilder = XContentFactory.jsonBuilder()
        val ruc = strArr(0)
        bulkRequest.add(client.prepareIndex("ruc", "ruc", ruc)
          .setSource(jsonBuilder
            .startObject()
            .field("ruc", ruc)
            .field("name", strArr(1))
            .field("dv", strArr(2))
            .field("rucAntique", strArr(3))
            .endObject()))
      }
      val bulkResponse = bulkRequest.execute().actionGet();
      if (bulkResponse.hasFailures()) {
        // process failures by iterating through each bulk response item
        Logger.info("problem: " + bulkResponse.buildFailureMessage())
      }
    }

    val count = client.prepareCount("ruc").execute().actionGet()
    Logger.info("docs: " + count.count())

  }

  //  def count(): Long = client.prepareCount("ruc").execute().actionGet().count()
}